
Flume概述
Flume is a distributed, reliable,
and available service for efficiently collecting(收集), 
aggregating(聚合), and moving(移动) large amounts of log data

作用：
    用于有效地收集，聚合和移动大量日志数据


webserver(源端)  ===>  flume   ===> hdfs(目的地)


设计目标：
	可靠性
	扩展性
	管理性


业界同类产品的对比
	(***)Flume： Cloudera/Apache   Java
	Scribe： Facebook   C/C++   不再维护
	Chukwa： Yahoo/Apache  Java  不再维护
	Kafka：
	Fluentd： Ruby
	(***)Logstash: ELK(ElasticSearch,Kibana)


Flume发展史
	Cloudera   0.9.2   Flume-OG
	flume-728  Flume-NG  ==> Apache 
	2012.7  1.0
	2015.5  1.6   (*** + )
	~		1.7


Flume架构及核心组件
1) Source   收集  

2) Channel  聚集

3) Sink     输出
 




Flume安装前置条件
    Java Runtime Environment - Java 1.7 or later
    Memory - Sufficient memory for configurations used by sources, channels or sinks
    Disk Space - Sufficient disk space for configurations used by channels or sinks
    Directory Permissions - Read/Write permissions for directories used by agent


安装jdk
	下载
	解压到~/app
	将java配置系统环境变量中: ~/.bash_profile	
		export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144
		export PATH=$JAVA_HOME/bin:$PATH
	source下让其配置生效
	检测: java  -version


安装Flume
	下载apache-flume-1.6.0-cdh5.7.0
	解压到~/app
	将java配置系统环境变量中: ~/.bash_profile	
		export FLUME_HOME=/home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin
		export PATH=$FLUME_HOME/bin:$PATH
	source下让其配置生效	
	flume-env.sh的配置：export JAVA_HOME=/home/hadoop/app/jdk1.8.0_144
	检测: flume-ng version


example.conf: A single-node Flume configuration

使用Flume的关键就是写配置文件

A） 配置Source
B） 配置Channel
C） 配置Sink
D） 把以上三个组件串起来

a1: agent名称 
r1: source的名称
k1: sink的名称
c1: channel的名称

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop000
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


启动agent
flume-ng agent \
--name a1  \
--conf $FLUME_HOME/conf  \
--conf-file $FLUME_HOME/conf/example.conf \
-Dflume.root.logger=INFO,console

使用telnet进行测试： telnet hadoop000 44444


Event: { headers:{} body: 68 65 6C 6C 6F 0D hello. }
Event是FLume数据传输的基本单元
Event =  可选的header + byte array




tail -F /home/hadoop/data/data.log

监听文件的改变，并打印文件信息



需求二：
Agent选型：exec source + memory channel + logger sink
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/hadoop/data/data.log
a1.sources.r1.shell = /bin/sh -c

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动agent
flume-ng agent \
--name a1  \
--conf $FLUME_HOME/conf  \
--conf-file $FLUME_HOME/conf/exec-memory-logger.conf \
-Dflume.root.logger=INFO,console


需求三：
技术选型：
	exec source + memory channel + avro sink
	avro source + memory channel + logger sink

exec-memory-avro.conf

exec-memory-avro.sources = exec-source  exec-source1  exec-source2
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c

exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel



avro-memory-logger.conf
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel



先启动avro-memory-logger
flume-ng agent \
--name avro-memory-logger  \
--conf $FLUME_HOME/conf  \
--conf-file $FLUME_HOME/conf/avro-memory-logger.conf \
-Dflume.root.logger=INFO,console


flume-ng agent \
--name exec-memory-avro  \
--conf $FLUME_HOME/conf  \
--conf-file $FLUME_HOME/conf/exec-memory-avro.conf \
-Dflume.root.logger=INFO,console









org.apache.flume.EventDeliveryException: Failed to send events
Caused by: org.apache.flume.FlumeException:
    NettyAvroRpcClient { host: localhost, port: 5000 }: RPC connection error
Caused by: java.io.IOException: Error connecting to /127.0.0.1:5000
Caused by: java.net.ConnectException: Connection refused



作业1
exec1-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source1.type = exec
exec-memory-avro.sources.exec-source1.command = tail -F /home/hadoop/data/data1.log
exec-memory-avro.sources.exec-source1.shell = /bin/sh -c


exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel



exec2-memory-avro.conf

exec-memory-avro.sources = exec-source
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source1.type = exec
exec-memory-avro.sources.exec-source1.command = tail -F /home/hadoop/data/data2.log
exec-memory-avro.sources.exec-source1.shell = /bin/sh -c


exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sinks.avro-sink.channel = memory-channel





avro-memory-logger.conf
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel




作业2

exec-memory-avro.conf

exec-memory-avro.sources = exec-source  exec-source1
exec-memory-avro.sinks = avro-sink
exec-memory-avro.channels = memory-channel

exec-memory-avro.sources.exec-source.type = exec
exec-memory-avro.sources.exec-source.command = tail -F /home/hadoop/data/data1.log
exec-memory-avro.sources.exec-source.shell = /bin/sh -c


exec-memory-avro.sources.exec-source1.type = exec
exec-memory-avro.sources.exec-source1.command = tail -F /home/hadoop/data/data2.log
exec-memory-avro.sources.exec-source1.shell = /bin/sh -c


exec-memory-avro.sinks.avro-sink.type = avro
exec-memory-avro.sinks.avro-sink.hostname = hadoop000
exec-memory-avro.sinks.avro-sink.port = 44444

exec-memory-avro.channels.memory-channel.type = memory

exec-memory-avro.sources.exec-source.channels = memory-channel
exec-memory-avro.sources.exec-source1.channels = memory-channel

exec-memory-avro.sinks.avro-sink.channel = memory-channel



avro-memory-logger.conf
avro-memory-logger.sources = avro-source
avro-memory-logger.sinks = logger-sink
avro-memory-logger.channels = memory-channel

avro-memory-logger.sources.avro-source.type = avro
avro-memory-logger.sources.avro-source.bind = hadoop000
avro-memory-logger.sources.avro-source.port = 44444

avro-memory-logger.sinks.logger-sink.type = logger

avro-memory-logger.channels.memory-channel.type = memory

avro-memory-logger.sources.avro-source.channels = memory-channel
avro-memory-logger.sinks.logger-sink.channel = memory-channel
